package com.fanghuaiming.notify.account.mqservice;

import com.alibaba.fastjson.JSON;
import com.fanghuaiming.notify.account.common.NotifyAccountCache;
import com.fanghuaiming.notify.account.service.NotityAccountInfoService;
import com.fanghuaiming.notify.common.model.common.NotifyAccountChange;
import com.fanghuaiming.notify.common.model.enumeration.PayTransactionEnum;
import com.fanghuaiming.notify.common.model.pay.NotifyPay;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/****
 * @description: 消息接受方
 * @version:1.0.0
 * @author fanghuaiming
 * @data Created in 2020/12/3 下午5:24
 *
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.account.accept}" , consumerGroup = "${rocketmq.comsumer.group}")
public class NotifyPayListener implements RocketMQListener<NotifyPay> {

    /**
     * 账户微服务
     */
    @Autowired
    private NotityAccountInfoService notityAccountInfoService;

    /**
     * @Description: 接收消息
     *
     * @param:
     * @return:
     * @auther: fanghuaiming
     * @date: 2020/12/3 下午5:29
     * @version:1.0.0
     */
    @Override
    public void onMessage(NotifyPay notifyPay) {
        //先保存到全局缓存供多线程调用
        if(NotifyAccountCache.transactionIdMap.containsKey(notifyPay.getTransactionId())){
            //使用原子类进行加减
            AtomicInteger atomicInteger = NotifyAccountCache.transactionIdMap.get(notifyPay.getTransactionId());
            if(atomicInteger.get() > 2){
                //大于5次代表之前5次都已经失败了,这时候应该进行人工干预了
                log.error("====== 充值完成后账户始终无法到账,已进行报警请及时进行人工处理退款或充值:{} ======",notifyPay);
                NotifyAccountCache.transactionIdMap.remove(notifyPay.getTransactionId());
                return;
            }
            atomicInteger.incrementAndGet();
        }else {
            NotifyAccountCache.transactionIdMap.put(notifyPay.getTransactionId(),new AtomicInteger(1));
        }
        log.info("====== 接收到消息:{} ======", JSON.toJSONString(notifyPay));
        if (PayTransactionEnum.SUCCESS.getMessage().equals(notifyPay.getResult())) {
            //更新账户金额
            log.info("====== 准备更新账户余额 ======");
            NotifyAccountChange notifyAccountChange = NotifyAccountChange
                    .builder()
                    .accountNo(notifyPay.getAccountNo())
                    .amount(notifyPay.getPayAmount())
                    .transactionId(notifyPay.getTransactionId())
                    .build();
            try {
                notityAccountInfoService.updateNotifyAccountBalance(notifyAccountChange);
                log.info("====== 账户余额已更新完成 ======");
                NotifyAccountCache.transactionIdMap.remove(notifyPay.getTransactionId());
            } catch (Exception e) {
                log.info("====== 账户余额已更新事务失败,下一分钟由定时失败事务处理器进行处理,出错原因:{}/{} ======",e.getMessage(),e.getCause());
            }

        }
    }
}
